Wang Haihua
🍈 🍉🍊 🍋 🍌
强化学习是现今机器学习领域中,最令人兴奋并且最古老的研究领域。从20世纪50年代至今,强化学习领域诞生出了许多有趣的应用,尤其在游戏领域以及机器控制领域,但它从未获得过多的关注。直到2013年,DeepMind演示了一个可以从零开始学习玩任何雅达利游戏(Atari game)的系统,其在游戏中的表现超越了大多数的人类。有趣的是,该系统仅仅依靠原始的游戏画面像素作为输入,并不具备该游戏的任何先验知识;2016年,DeepMind开发的基于深度强化学习的AlphaGo系统击败了两名职业围棋选手李世石和柯洁。2014年, DeepMind以5亿美元的价格被Google收购。
DeepMind实现这些成就的根本原因是将深度学习的方法引入了强化学习领域。本章将首先介绍强化学习的基本概念以及其擅长解决的问题,然后展示深度强化学习(Deep Reinforcement Learning)中的两种基本技术:policy gradients 和 deep Q-networks(DQNs), 其中涉及对马尔可夫决策过程(Markov decision processes, MDPs)的讨论。在讨论过程当中,我们将使用提到的技术分别训练模型来平衡移动小车上的杆子,从而展示其性能;然后我们会介绍TF-Agents库,其中集成了最先进的强化学习算法并简化了强化学习系统的搭建;进一步,我们会使用TF-Agents训练著名的雅达利游戏Breakout。最终我们会探讨强化学习领域的研究前沿。
mse_test = model.evaluate(X_test, y_test)
X_new = X_test[:3]
y_pred = model.predict(X_new)
定义:强化学习中,一个agent通过在环境(environments)中进行观察(observations)并采取行动(actions)来获取奖励(rewards)。其学习目标是找到一种行动策略(policy)使得其在时间尺度上获取的平均奖励最大化。学习过程类似于生物体的趋利避害,agents在学习的过程中通过不断试错(trails and error)来最大化“利”而最小化”害“。
应用:在以上定义下,强化学习可以应用于广泛的研究任务。例如:
强化学习中,Agents用以决定其行为的算法称为策略(Policy)。最简单的,一个神经网络算法如果将observations作为输入,将action作为输出,那么该神经网络算法就是一个Policy。Policy可以是任何算法,并且不需要是确定性的算法,甚至不需要通过observations来得到。以吸尘器机器人为例:
机器人Alice的reward是其30分钟内吸起的灰尘总量。其policy可以是每秒钟以概率p直接前行,或者以概率1-p旋转角度r后前进。由于这个policy涉及到随机性,因此称为随机策略(stochastic policy)。在这个policy下,Alice的轨迹是不确定的,但这也保证了Alice会遍历屋子里的所有地面。问题是,这个policy的参数p和r如何设定才会让Alice在30分钟内吸起灰尘量最大呢?
这个policy的学习涉及到p和r两个参数的学习。一种简单的学习办法是尝试大量的参数取值,选取最终效果最好的,但这种策略搜索方法太暴力也太低效,当策略空间很大的时候,这种方法无异于大海捞针。另一种办法是使用遗传算法探索策略空间。例如,随机初始化100个策略,去除其中最差的80个,剩余的20个每个再繁衍4个子代,繁衍子代的方法是复制父代再添加一定的随机变异,持续繁衍多代直到找到符合标准的策略。还有一种方案是使用优化方法,也就是计算reward关于参数的梯度,利用梯度上升法进行policy的搜索,这种方法称为policy gradients(PG)。回到吸尘器的例子,可以尝试轻微的增大p的取值,如果reward因此而增加,那么就增大p, 反之则减小。接下来,我们将利用TensorFlow实现一个基本的PG方法,但在此之前,我们首先基于OpenAI Gym 搭建一个可以让agent存在的环境。
强化学习一个重要挑战就是在训练一个agent的时候,需要创建一个工作环境。比如,我们想要用强化学习来训练agent玩Atari游戏,则需要一个Atari游戏模拟器。换句话说,训练在现实场景中是非常艰难和缓慢的,因此通常需要一个仿真环境。例如,使用PyBullet或者MuJoCo来进行3D物理建模。
OpenAI Gym是一个提供了大量仿真环境的工具箱(Atari游戏,棋类游戏,2D或3D建模等),基于此我们可以训练Agents,对它们进行对比,或者设计新的强化学习算法。
在安装OpenAI Gym之前,最好创建一个独立的虚拟环境并激活(命令行代码如下):
$ conda create -n env_name
# env_name为创建用以安装Gym的虚拟环境,可自行设定,如:conda create -n my_gym
$ conda install python==3.7
# 安装所需要的python版本
$ activate env_name
# 激活创建的虚拟环境
$ pip install --upgrade gym
# 安装并更新到最新版本的Gym
安装好Gym之后,就可以在python文件中导入并创建第一个游戏环境了(以下我们创建一个”小车承杆“的环境):
import gym
env = gym.make("CartPole-v1")
obs = env.reset()
obs
array([0.02766288, 0.00910654, 0.03462887, 0.02690522])
通过调用render()方法可以展示这个虚拟环境:
env.render()
True
如果想以numpy数组的形式输出render()命令得到的图像,那么可以设定mode="rgb_array":
img=env.render(mode="rgb_array")
img.shape
现在,我们查询environment中的action空间:
env.action_space
Discrete(2)
Discrete(2)的含义是在目前定义的CartPole的环境中只支持0和1两个操作,换句话说,action空间仅包含0和1,也就是要么向左加速(0),要么向右加速(1)。在其他的环境中或许有更大的action空间,甚至是连续型的。由于此时小车上的杆子是向右倾斜的(obs[2]>0),因此我们尝试让小车向右加速:
action=1
obs,reward,done,info=env.step(action)
obs
array([ 0.02784501, 0.20371521, 0.03516697, -0.25465393])
Step()方法执行给定的action,并返回4个值:
obs
: 代表了新的observation. 目前,小车在向右移动(obs[1]>0),杆子仍然向右倾斜(obs[2]>0),但其角速度是负的(obs[3]<0),因此杆子很可能下一步就会偏向左边。
reward
:在这个环境当中,无论你在每一步采取了什么action,都会得到一个1.0的奖励,因此训练目标是使得这个episode一直向下运行,即杆子一直在小车上保持平衡。
done
: 代表当前episode是否结束,结束即为true。此环境中,episode结束的标志是:当杆子倾斜太多或者离开屏幕时,或者当前episode已经进行了200步(这意味着你已经赢了)。episode结束后,必须重置环境以进行下一个episode。
info
: 这个环境特定的字典负责提供一些关于debug或者训练的额外信息。比如,在某个游戏里面,它可能指出agent还有多少条命。
Tips:一旦某个environment使用结束,应该调用close()函数释放资源。
接下来我们实现一个简单的policy:当杆子偏向左时小车向左加速,偏向右时小车向右加速。然后基于此策略连续执行500个episodes.
def basic_policy(obs):
angle=obs[2]
return 0 if angle<0 else 1
totals=[]
for episode in range(500):
episode_rewards=0
obs=env.reset()
for step in range(200):
action=basic_policy(obs)
obs, reward, done, info = env.step(action)
episode_rewards += reward
if done:
break
totals.append(episode_rewards)
import numpy as np
np.mean(totals), np.std(totals),np.mean(totals),np.max(totals)
(42.234, 8.779250765298826, 42.234, 67.0)
可以看出,在现有策略(policy)下,即使进行了500次尝试,也没有一次成功让杆子在小车上方坚持超过68个step,这显然是一个糟糕的策略。接下来我们考虑能否利用神经网络找到更好的策略(policy)。
利用神经网络寻找policy的基本思想是:以当前observation为输入,下一步action为输出,中间是黑盒子。事实上,我们期望神经网络在action空间上输出一个概率分布, 而下一步的action就从这个概率分布中随机采样得到。通俗的讲,在当前小车承杆的仿真环境中,我们希望神经网络告诉我们小车向右加速和向左加速的概率分别是多大,然后根据具体的概率决定向左还是向右。比如,向左向右的概率分别是30%和70%,那么我们就以30%的概率向左,70%的概率向右。相比于直接比较概率大小然后决定向左向右的方法,这种依概率随机采样的方法保留了agent探索更多action的能力。
import tensorflow as tf
from tensorflow import keras
import numpy as np
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)
n_inputs=4 # == env.observation_space.shape[0]
model=keras.models.Sequential([
keras.layers.Dense(5,activation="elu",input_shape=[n_inputs]),
keras.layers.Dense(1,activation="sigmoid"),
])
def update_scene(num, frames, patch):
patch.set_data(frames[num])
return patch,
def plot_animation(frames, repeat=False, interval=40):
fig = plt.figure()
patch = plt.imshow(frames[0])
plt.axis('off')
anim = animation.FuncAnimation(
fig, update_scene, fargs=(frames, patch),
frames=len(frames), repeat=repeat, interval=interval)
plt.close()
return anim
以上我们基于keras的sequential模块建立了一个policy network。其中网络输入为观察空间的大小,网络隐含层为一个5个units组成的全连接层,输出为1维,代表小车向左加速的概率,如果action space的中有更多的action选择,可以选择设置更多的输出,并利用softmax进行归一化。
def render_policy_net(model, n_max_steps=200, seed=42):
frames = []
env = gym.make("CartPole-v1")
env.seed(seed)
np.random.seed(seed)
obs = env.reset()
for step in range(n_max_steps):
frames.append(env.render(mode="rgb_array"))
left_proba = model.predict(obs.reshape(1, -1))
action = int(np.random.rand() > left_proba)
obs, reward, done, info = env.step(action)
if done:
break
env.close()
return frames
frames = render_policy_net(model)
plot_animation(frames)
n_environments = 50
n_iterations = 5000
envs = [gym.make("CartPole-v1") for _ in range(n_environments)]
for index, env in enumerate(envs):
env.seed(index)
np.random.seed(42)
observations = [env.reset() for env in envs]
optimizer = keras.optimizers.RMSprop()
loss_fn = keras.losses.binary_crossentropy
for iteration in range(n_iterations):
# if angle < 0, we want proba(left) = 1., or else proba(left) = 0.
target_probas = np.array([([1.] if obs[2] < 0 else [0.])
for obs in observations])
with tf.GradientTape() as tape:
left_probas = model(np.array(observations))
loss = tf.reduce_mean(loss_fn(target_probas, left_probas))
print("\rIteration: {}, Loss: {:.3f}".format(iteration, loss.numpy()), end="")
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
actions = (np.random.rand(n_environments, 1) > left_probas.numpy()).astype(np.int32)
for env_index, env in enumerate(envs):
obs, reward, done, info = env.step(actions[env_index][0])
observations[env_index] = obs if not done else env.reset()
for env in envs:
env.close()
frames = render_policy_net(model)
plot_animation(frames)
如果已知每一步的最佳action是什么,我们就可以通过最小化目标概率分布与估计概率分布之间的交叉熵来训练神经网络,此时策略学习问题就成为了一个普通的有监督学习问题。然而,agents在强化学习中的指导往往只有稀疏并且存在延迟的奖励。试想,杆子在小车上坚持了100个steps,但agent如何知道在这100次action中具体哪些是好的,哪些是坏的。它只知道在最后一次action后,杆子从小车上掉了下去,但显然这个掉落不完全由最后一个action导致。换句话说,如果一个agent得到了某一个奖励,但它并不知道是因为具体哪些action激发了这个奖励,这个问题称为信用分配问题
。
为了解决信用分配问题
,即公正地评判一个action的好坏,一个常见的策略是基于该action之后所有奖励的和来评判,通常还会为每一步的奖励引入一个折旧因子$\gamma$来加权。这个基于折旧因子的加权奖励和被称为该action的return。试想,一个agent决定连续采取3次向右行进的动作,第一次得到了$+10$的奖励,第二次得到了0,第三次得到了$-50$的奖励。假设折旧因子$\gamma=0.8$,那么第一个action的return为$10+\gamma\times0+\gamma^{2}\times(-50)=-22$。如果折旧因子接近于0,未来的奖励相比于即时奖励几乎可以忽略不计;反之,如果折旧因子接近于1,那么未来获取的奖励和即时奖励几乎占据一样的权重。$\gamma$通常会取$0.9$到$0.99$之间的某个值。当$\gamma=0.95$,第13次action的reward比重将降低到$0.5$($0.95^{13}\approx0.5$),然而当$\gamma=0.99$,则需要69步,reward比重才会降低到$0.5$。因此,看起来$\gamma=0.95$是比较合理的。
思考问题:如果每一步都存在一个reward的话,岂不是从一开始就知道了每个action的好坏,为什么还要费尽心思利用return来评判呢?reward对于一个action的价值评判与return有什么区别?
然而,仅仅利用单次游戏某个action的return来评判action的好坏仍然是不充分的,因为在游戏过程中,一个好的action后续可能会跟随着一些坏的actions使得杆子掉落的更快,从而使得一个好的action却得到了一个较低的return。但是,如果我们进行足够多次游戏,平均意义上好的action通常会得到更高的return。在一个有效的算法当中,我们需要能够客观量化某个action在平均意义上会比其他可能的action好多少或者坏多少(动作优势,action advantage)。这一点,我们可以通过进行多个episodes并且对所有动作的return进行标准化(减去均值再除以标准差)来实现。在此之后,我们就可以合理的假设那些得到了负的advantage的action是坏的,而得到了正advantage的action是好的。现在已经得到了评估一个action的方法,接下来我们利用该方法训练一个agent。
Policy Gradients(PG)的基本思想是通过梯度下降的方法优化policy的参数从而实现更高的奖励。一类经典的PG方法是1992年提出的REINFORCE
算法,下面介绍一个REINFORCE
算法的常见变体:
Step1: 首先, 让agent根据基于神经网络的policy玩很多次游戏。并且在每一步,计算使所选action更有可能发生的梯度并保存(比如:损失函数是将被选action为groundtruth的交叉熵,参数是神经网络的权重以及偏倚参数)。但是暂时不使用这些梯度;
Step2: 一旦游戏进行了多个episodes,计算每一个action的优势(利用上一节介绍的方法);
Step3: 如果某个action的优势是正的,说明这个action很可能是好的,那么我们希望能够应用之前计算的梯度,以使该action在未来更有可能被选择。反之亦然。要实现这一点,我们只需要将每一个梯度向量乘以对应action的优势;
Step4: 最终,计算所有导致的梯度向量的均值,并且利用该均值来进行梯度下降;
现在我们使用tf.keras来实现这个算法。继续考虑小车承杆的场景,我们将利用神经网络来让杆子在小车上实现平衡。首先,我们需要一个函数来执行每一个action。不妨,我们先让每一个action选取都向右,这使得我们可以计算loss和对应的梯度。
def play_one_step(env, obs, model, loss_fn):
with tf.GradientTape() as tape:
left_proba = model(obs[np.newaxis])
action = (tf.random.uniform([1, 1]) > left_proba)
y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
loss = tf.reduce_mean(loss_fn(y_target, left_proba))
grads = tape.gradient(loss, model.trainable_variables)
obs, reward, done, info = env.step(int(action[0, 0].numpy()))
return obs, reward, done, grads
代码注释:
在GradientTape模块中,首先建立模型,以observation为输入,以action为左的概率left_proba为输出。
接下来,我们在$[0,1]$区间随机采一个数,如果这个数大于left_proba,则action为向右($action=1$),否则,action为向左($action=0$)。
接下来,定义向左的目标概率(如果action为向左,则希望向左的target为1,反之亦然): $y_{target}=1-action$。
然后利用给定的损失函数计算损失,并关于模型变量计算梯度。要注意的是:这些梯度并不立即使用,而是存储起来用于判断action的真正好坏。
最后,执行所选的action,并且返回得到的obsevation,reward,以及当前episode是否结束等相关信息。
现在,依据play_one_step()函数来创建另一个函数play_multiple_episodes()来执行多个episodes,并返回每个episodes中每个step对应的reward和梯度信息:
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
all_rewards = []
all_grads = []
for episode in range(n_episodes):
current_rewards = []
current_grads = []
obs = env.reset()
for step in range(n_max_steps):
obs, reward, done, grads = play_one_step(env, obs, model,loss_fn)
current_rewards.append(reward)
current_grads.append(grads)
if done:
break
all_rewards.append(current_rewards)
all_grads.append(current_grads)
return all_rewards, all_grads
play_multiple_episodes()返回了rewards的列表(每一个episode对应一个reward列表,每个列表包含了每一步的奖励信息)以及梯度列表(每个episode对应一个梯度列表,而每一个梯度列表包含了每一步的梯度信息)
def discount_rewards(rewards, discount_factor):
discounted = np.array(rewards)
for step in range(len(rewards) - 2, -1, -1):
discounted[step] += discounted[step + 1] * discount_factor
return discounted
def discount_and_normalize_rewards(all_rewards, discount_factor):
all_discounted_rewards = [discount_rewards(rewards, discount_factor)for rewards in all_rewards]
flat_rewards = np.concatenate(all_discounted_rewards)
reward_mean = flat_rewards.mean()
reward_std = flat_rewards.std()
return [(discounted_rewards - reward_mean) / reward_std for discounted_rewards in all_discounted_rewards]
def render_policy_net(model, n_max_steps=200, seed=42):
frames = []
env = gym.make("CartPole-v1")
env.seed(seed)
np.random.seed(seed)
obs = env.reset()
for step in range(n_max_steps):
frames.append(env.render(mode="rgb_array"))
left_proba = model.predict(obs.reshape(1, -1))
action = int(np.random.rand() > left_proba)
obs, reward, done, info = env.step(action)
if done:
break
env.close()
return frames
discount_rewards([10, 0, -50], discount_factor=0.8)
discount_rewards()函数返回了折旧后的reward信息。而discount_and_normalize_rewards()函数则进一步计算出了episodes中每一个action的优势大小。注意第一个episode比第二个episode要差,因此第一个episode中每一个action的优势都是负的;也就是说第一个episode中每一个action都会被认为是不好的action。
接下来直接实现算法!首先定义超参如下:
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95
定义优化器以及损失函数:
optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.binary_crossentropy
建立并运行training loop:
import gym
env = gym.make("CartPole-v1")
env.seed(42);
model = keras.models.Sequential([
keras.layers.Dense(5, activation="elu", input_shape=[4]),
keras.layers.Dense(1, activation="sigmoid"),
])
for iteration in range(n_iterations):
all_rewards, all_grads = play_multiple_episodes(
env, n_episodes_per_update, n_max_steps, model, loss_fn)
total_rewards = sum(map(sum, all_rewards)) # Not shown in the book
print("\rIteration: {}, mean rewards: {:.1f}".format( # Not shown
iteration, total_rewards / n_episodes_per_update), end="") # Not shown
all_final_rewards = discount_and_normalize_rewards(all_rewards,
discount_factor)
all_mean_grads = []
for var_index in range(len(model.trainable_variables)):
mean_grads = tf.reduce_mean(
[final_reward * all_grads[episode_index][step][var_index]
for episode_index, final_rewards in enumerate(all_final_rewards)
for step, final_reward in enumerate(final_rewards)], axis=0)
all_mean_grads.append(mean_grads)
optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))
env.close()
frames = render_policy_net(model)
plot_animation(frames)
代码解释:
每一次迭代中,training loop调用play_multiple_episodes()函数,玩10次游戏,并返回相应的reward和梯度信息。
然后调用 discount_and_normalize_rewards()函数来计算每一个action的优势final_reward。
接下来,利用final_reward信息为梯度信息加权取平均。
最后,利用平均梯度进行模型优化。
以上代码可以成功的学习出好的策略并在小车承杆游戏中成功将杆子平衡在小车上(即,每一个episode的reward都接近200)。
以上简单的policy gradients算法可以解决小车承杆问题,但是并不能扩展到更大更复杂的任务中。在更复杂的任务中,采样往往是不充分的,需要进行很长时间的探索才能够真正取得进展。这是因为需要跑很多个episodes才能准确评估每一个action的优势。但是policy gradients算法为后续更有效的Actor-Critic
算法的出现奠定了基础。
policy gradients算法是利用梯度下降法最大化rewards来直接优化policy。而另一种方法则不那么直接,其试图评判每一个state,每一个action的好坏,从而决定如何选择action。为了充分理解该算法的思想,我们首先引入马尔可夫决策过程。
在20世纪早期,数学家马尔可夫研究了一种无记忆的随机过程,即马尔可夫链
(Markov Chain)。马尔可夫链
只有有限个状态,其每一步都会随机地从一个状态 $s$ 转移到另一个状态 $s'$。转移概率是固定的,并且仅仅取决于状态对 $(s,s')$,而不受之前状态的影响(这就是我们说这个系统无记忆的原因)。
马尔可夫决策过程
(Markov Decision Process, MDP)在20世纪50年代由理查德·贝尔曼引入。马尔可夫决策过程
类似于马尔可夫链
,但做了一些改变:在每一步,agent首先从action空间中选取一个action, 然后再进行状态的转移,转移概率则取决于选取的action。一些状态转移会返回一些reward(正或者负),而agent的目标则是找到一个可以在时间尺度上最大化reward的policy。
贝尔曼给出了估计某个状态 $\textbf{s}$ 的最优状态值 $V^{*}(s)$ 的方法,$V^{*}(s)$ 的含义是:假设agent按照最优方式运行,其在期望意义上,可获得的所有未来奖励在折旧因子加权后的总和。贝尔曼证明了如果agent按照最优的方式运行,那么贝尔曼最优方程
满足。这个递归方程指出如果agent的行为是最优的,那么当前状态的最优值就等于它采取一次最优action后得到的平均回报,加上这次action可能导致的所有状态的期望意义上的最优值。
贝尔曼最优方程:$V^{*}(s)=\max_a{\sum_{s'}{T(s,a,s')[R(s,a,s')+\gamma\cdot V^{*}(s')]}}$, $\forall s$
方程中,$T(s,a,s')$表示agent在状态 $s$ 选取动作 $a$ 后转移到状态 $s'$ 的概率, $R(s,a,s')$ 表示该步转移获取的奖励,$\gamma$ 表示折旧因子。
这个方程给出了一个直接估计最优状态值的算法:首先将所有的状态最优值初始化为0,然后利用值迭代方程
持续优化。一个出色的结果是,给定足够的迭代次数,这些估计一定回收敛到与最优策略相关的最优状态值。
值迭代方程:$V_{k+1}(s)\leftarrow \max_a{\sum_{s'}{T(s,a,s')[R(s,a,s')+\gamma\cdot V_{k}(s')]}}$, $\forall s$
在这个方程中,$V_{k}(s)$ 是状态 $s$ 在算法的第 $k$ 次迭代的状态最优值的估计。
NOTE: 这个算法是一个典型的动态规划问题解法,其将复杂问题转化为可解的子问题进行迭代求解。
利用以上方法我们可以求出每个状态的最优状态值,这个值在评估一个policy的时候会非常有用,但却无法直接提供一个最优的policy。幸运的是,贝尔曼找到了一个类似的方法来估计最优的状态-动作值 (state-action value)
, 也就是所谓的 Q-值(Quality Values)
。一个状态-动作
对儿的Q-值
$Q^{*}(s, a)$ 的含义是在状态 $s$ 的agent选取了动作 $a$ 之后所能期望的平均未来折现回报的总和。它奏效的模式如下:
首先将所有的 Q-值
初始化为0,然后利用 Q-值迭代方程
进行更新:
Q-值迭代方程:$Q_{k+1}^{*}(s, a)\leftarrow \sum_{s'}{T(s,a,s')[R(s,a,s')+\gamma\cdot\max_{a'}{ Q_{k}^{*}(s, a')}]}$, $\forall (s', a)$
一旦我们掌握了最优的Q-值
,定义最优策略$\pi^{*}{(s)}$就非常简单了:当agent在状态 $s$ 的时候,应当选择具有最高Q-值
的动作,则是最优的动作,即 $\pi^{*}(s)=argmax_a{Q^{*}(s, a)}$。
如上所述,离散动作空间的强化学习问题可以被建模为马尔可夫决策过程(MDP),但是实际操作中,agent在初始的时候往往并不知道转移概率 $T(s,a,s')$ 和 奖励 $R(s,a,s')$ 的确切大小。因此agent至少需要经历一次完整的状态转移才会清楚具体的reward大小,而至少要进行多次完整的转移才可以较为准确的估计转移概率 $T(s,a,s')$ 的大小。
时间差分学习(Temporal Difference Learning,TD learning)算法实际上类似于值迭代算法
,不同的地方是TD Learning假设agent仅仅知道部分MDP的知识,这与我们实际中面临的情况是吻合的。因为在实际中,agent通常只知道自身可能的状态以及可能进行的动作,而不具备更多的额外知识。为了解决现实中agent知识不足的问题,我们可以采用策略探索的方法--比如采用完全随机的策略--来探索MDP的更多知识。而随着探索的进行,agent也会根据自身实际的转移经历以及获取的reward来更新对于最优状态值的估计。
TD Learning算法: $V_{k+1}(s)\leftarrow (1-\alpha)V_{k}(s)+\alpha(r+\gamma\cdot V_k(s'))$
或者,等价形式为:
$V_{k+1}(s)\leftarrow V_{k}(s)+\alpha\cdot \delta_{k}(s,r,s')$,其中 $\delta_{k}(s,r,s')=r+\gamma\cdot V_k(s')-V_{k}(s)$
其中,$\alpha$ 为学习率参数;$r+\gamma\cdot V_k(s')$ 称为TD 目标(TD target)
; $\delta_{k}(s,r,s')$ 称为TD 误差(TD error)
。为简单期间,我们通常也会把方程写为 $a\leftarrow_{\alpha}b$,其含义是$a_{k+1}\leftarrow (1-\alpha)a_k+\alpha\cdot b_k$。因此,TD Learning算法的迭代表达式也可以写为:$V(s)\leftarrow_{\alpha}r+\gamma\cdot V(s')$。
TIP: TD learning方法与随机梯度下降(SGD)有许多相似之处,尤其是两者每一次只处理一个样本。就像SGD一样,TD learning 方法只有不断降低学习率才能真正收敛。
对于每一个状态 $s$,该算法只跟踪agent离开该状态时获得的即时回报的运行平均值,加上它期望稍后获得的回报。
类似的,Q-Learning 算法也是在agent关于MDP知识不足的情况下Q-值迭代算法的变体。Q-Learning 的基本思想是让agent不断地探索,然后逐渐基于探索的知识提升关于Q-值的估计。一旦得到了一个精确的Q-值,最优策略则是在每一步选取具有最高Q-值的行为(贪心策略)。
Q-learning 算法: $ Q(s, a)\leftarrow_{\alpha}r+\gamma\cdot \max_{a'}Q(s',a')$
对于每一个“状态-行为”对儿$(s,a)$, Q-learning 算法追踪其在时间尺度上通过离开状态$s$并采取 action $a$的平均reward,加上其期望意义上折旧的未来reward。为了估计这个加和,我们取下一状态$s'$上所有Q值估计的最大值,因为我们假设目标策略是最优的。
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)
# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"
try:
# %tensorflow_version only exists in Colab.
%tensorflow_version 2.x
!apt update && apt install -y libpq-dev libsdl2-dev swig xorg-dev xvfb
!pip install -q -U tf-agents-nightly pyvirtualdisplay gym[atari]
IS_COLAB = True
except Exception:
IS_COLAB = False
# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"
# if not tf.config.list_physical_devices('GPU'):
# print("No GPU was detected. CNNs can be very slow without a GPU.")
# if IS_COLAB:
# print("Go to Runtime > Change runtime and select a GPU hardware accelerator.")
# Common imports
import numpy as np
import os
# to make this notebook's output stable across runs
np.random.seed(42)
tf.random.set_seed(42)
# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)
# To get smooth animations
import matplotlib.animation as animation
mpl.rc('animation', html='jshtml')
# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "rl"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)
def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
print("Saving figure", fig_id)
if tight_layout:
plt.tight_layout()
plt.savefig(path, format=fig_extension, dpi=resolution)
import gym
env = gym.make("CartPole-v1")
obs = env.reset()
obs
array([-0.00046301, -0.0454752 , 0.02578857, -0.01843433])
#img=env.render(mode="rgb_array")
#img.shape
(400, 600, 3)
env.action_space
Discrete(2)
action=1
obs,reward,done,info=env.step(action)
obs
array([-0.00137251, 0.1492676 , 0.02541988, -0.3028704 ])
def basic_policy(obs):
angle=obs[2]
return 0 if angle<0 else 1
totals=[]
for episode in range(500):
episode_rewards=0
obs=env.reset()
for step in range(200):
action=basic_policy(obs)
obs, reward, done, info = env.step(action)
episode_rewards += reward
if done:
break
totals.append(episode_rewards)
import numpy as np
np.mean(totals), np.std(totals),np.mean(totals),np.max(totals)
(42.248, 8.639357383509493, 42.248, 71.0)
import tensorflow as tf
from tensorflow import keras
import numpy as np
keras.backend.clear_session()
tf.random.set_seed(42)
np.random.seed(42)
n_inputs=4 # == env.observation_space.shape[0]
model=keras.models.Sequential([
keras.layers.Dense(5,activation="elu",input_shape=[n_inputs]),
keras.layers.Dense(1,activation="sigmoid"),
])
def update_scene(num, frames, patch):
patch.set_data(frames[num])
return patch,
def plot_animation(frames, repeat=False, interval=40):
fig = plt.figure()
patch = plt.imshow(frames[0])
plt.axis('off')
anim = animation.FuncAnimation(
fig, update_scene, fargs=(frames, patch),
frames=len(frames), repeat=repeat, interval=interval)
plt.close()
return anim
def render_policy_net(model, n_max_steps=200, seed=42):
frames = []
env = gym.make("CartPole-v1")
env.seed(seed)
np.random.seed(seed)
obs = env.reset()
for step in range(n_max_steps):
frames.append(env.render(mode="rgb_array"))
left_proba = model.predict(obs.reshape(1, -1))
action = int(np.random.rand() > left_proba)
obs, reward, done, info = env.step(action)
if done:
break
env.close()
return frames
frames = render_policy_net(model)
plot_animation(frames)
n_environments = 50
n_iterations = 5000
envs = [gym.make("CartPole-v1") for _ in range(n_environments)]
for index, env in enumerate(envs):
env.seed(index)
np.random.seed(42)
observations = [env.reset() for env in envs]
optimizer = keras.optimizers.RMSprop()
loss_fn = keras.losses.binary_crossentropy
for iteration in range(n_iterations):
# if angle < 0, we want proba(left) = 1., or else proba(left) = 0.
target_probas = np.array([([1.] if obs[2] < 0 else [0.])
for obs in observations])
with tf.GradientTape() as tape:
left_probas = model(np.array(observations))
loss = tf.reduce_mean(loss_fn(target_probas, left_probas))
print("\rIteration: {}, Loss: {:.3f}".format(iteration, loss.numpy()), end="")
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
actions = (np.random.rand(n_environments, 1) > left_probas.numpy()).astype(np.int32)
for env_index, env in enumerate(envs):
obs, reward, done, info = env.step(actions[env_index][0])
observations[env_index] = obs if not done else env.reset()
for env in envs:
env.close()
frames = render_policy_net(model)
plot_animation(frames)
Iteration: 4999, Loss: 0.094
def play_one_step(env, obs, model, loss_fn):
with tf.GradientTape() as tape:
left_proba = model(obs[np.newaxis])
action = (tf.random.uniform([1, 1]) > left_proba)
y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
loss = tf.reduce_mean(loss_fn(y_target, left_proba))
grads = tape.gradient(loss, model.trainable_variables)
obs, reward, done, info = env.step(int(action[0, 0].numpy()))
return obs, reward, done, grads
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
all_rewards = []
all_grads = []
for episode in range(n_episodes):
current_rewards = []
current_grads = []
obs = env.reset()
for step in range(n_max_steps):
obs, reward, done, grads = play_one_step(env, obs, model,loss_fn)
current_rewards.append(reward)
current_grads.append(grads)
if done:
break
all_rewards.append(current_rewards)
all_grads.append(current_grads)
return all_rewards, all_grads
def discount_rewards(rewards, discount_factor):
discounted = np.array(rewards)
for step in range(len(rewards) - 2, -1, -1):
discounted[step] += discounted[step + 1] * discount_factor
return discounted
def discount_and_normalize_rewards(all_rewards, discount_factor):
all_discounted_rewards = [discount_rewards(rewards, discount_factor)for rewards in all_rewards]
flat_rewards = np.concatenate(all_discounted_rewards)
reward_mean = flat_rewards.mean()
reward_std = flat_rewards.std()
return [(discounted_rewards - reward_mean) / reward_std for discounted_rewards in all_discounted_rewards]
def render_policy_net(model, n_max_steps=200, seed=42):
frames = []
env = gym.make("CartPole-v1")
env.seed(seed)
np.random.seed(seed)
obs = env.reset()
for step in range(n_max_steps):
frames.append(env.render(mode="rgb_array"))
left_proba = model.predict(obs.reshape(1, -1))
action = int(np.random.rand() > left_proba)
obs, reward, done, info = env.step(action)
if done:
break
env.close()
return frames
discount_rewards([10, 0, -50], discount_factor=0.8)
array([-22, -40, -50])
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95
optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.binary_crossentropy
import gym
env = gym.make("CartPole-v1")
env.seed(42);
model = keras.models.Sequential([
keras.layers.Dense(5, activation="elu", input_shape=[4]),
keras.layers.Dense(1, activation="sigmoid"),
])
for iteration in range(n_iterations):
all_rewards, all_grads = play_multiple_episodes(
env, n_episodes_per_update, n_max_steps, model, loss_fn)
total_rewards = sum(map(sum, all_rewards)) # Not shown in the book
print("\rIteration: {}, mean rewards: {:.1f}".format( # Not shown
iteration, total_rewards / n_episodes_per_update), end="") # Not shown
all_final_rewards = discount_and_normalize_rewards(all_rewards,
discount_factor)
all_mean_grads = []
for var_index in range(len(model.trainable_variables)):
mean_grads = tf.reduce_mean(
[final_reward * all_grads[episode_index][step][var_index]
for episode_index, final_rewards in enumerate(all_final_rewards)
for step, final_reward in enumerate(final_rewards)], axis=0)
all_mean_grads.append(mean_grads)
optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))
env.close()
frames = render_policy_net(model)
plot_animation(frames)
Iteration: 149, mean rewards: 170.4